package com.xiaojie.hadoop.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;

import java.io.IOException;
import java.util.List;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: hbase工具类
 * @date 2025/1/3 9:27
 */
@Slf4j
public class HBaseUtil {

    private static Connection connection;

    static {
        Configuration configuration = HBaseConfiguration.create();
        //设置端口号
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        //设置zk连接
        configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
        //创建连接
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * @param namespace
     * @description: 创建namespace
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 11:03
     */
    public static boolean createNameSpace(String namespace) {
        try {
            Admin admin = connection.getAdmin();
            NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
            admin.createNamespace(namespaceDescriptor);
            log.info(">>>>>>>>>>>>创建namespace成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param namespace
     * @description: 删除ns
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 11:05
     */
    public static boolean deleteNameSpace(String namespace) {
        try {
            Admin admin = connection.getAdmin();
            admin.deleteNamespace(namespace);
            log.info(">>>>>>>>>>>>删除namespace成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName      表名
     * @param columnFamilies 列族
     * @description: 创建表
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 9:35
     */
    public static boolean createTable(String tableName, List<String> columnFamilies, String nameSpace) throws IOException {
        Admin admin = connection.getAdmin();
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        //创建表
        if (!exists) {
            //如果namespace是空值则会使用default，作为命名空间
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));
            columnFamilies.forEach(cf -> {
                ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
                columnFamilyDescriptorBuilder.setMaxVersions(1);
                ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
                tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
            });
            TableDescriptor tableDescriptor = tableDescriptorBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, IncreasingToUpperBoundRegionSplitPolicy.class.getName()).build();
            admin.createTable(tableDescriptor);

            return true;
        } else {
            log.info("table exists>>>>>>>>");
            return false;
        }
    }

    /**
     * @param tableName 表名
     * @description: 删除表
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 10:16
     */
    public static boolean deleteTable(String tableName) {
        try {
            Admin admin = connection.getAdmin();
            //先禁用表
            admin.disableTable(TableName.valueOf(tableName));
            //再删除
            admin.deleteTable(TableName.valueOf(tableName));
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName        表名
     * @param rowKey           rowkey
     * @param columnFamilyName 列族
     * @param qualifier        列标识
     * @param value            数据
     * @description: 插入数据
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 16:46
     */
    public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, String qualifier,
                                 String value) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
            table.put(put);
            log.info(">>>>>>>插入数据成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @param columnFamilyName
     * @param pairList         键值对集合
     * @description: 插入数据
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 17:32
     */
    public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {

        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            pairList.forEach(pair -> {
                put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getFirst()), Bytes.toBytes(pair.getSecond()));
            });
            table.put(put);
            log.info(">>>>>>>插入数据成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @description:根据Rowkey查询
     * @return: org.apache.hadoop.hbase.client.Result
     * @author 熟透的蜗牛
     * @date: 2025/1/3 17:42
     */
    public static Result getRowByRowKey(String nameSpace, String tableName, String rowKey) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Result result = table.get(new Get(Bytes.toBytes(rowKey)));
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @description: 查询所有数据, 和范围查询
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/3 18:12
     */
    public static ResultScanner getAll(String nameSpace, String tableName) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Scan scan = new Scan();
            scan.setCacheBlocks(true);//设置读缓存
            scan.withStartRow(Bytes.toBytes("1002")); //rowkey的起始值
            scan.withStopRow(Bytes.toBytes("1003"));  //rowkey的结束值，返回结果不包含该值
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param filterList
     * @description: 查找过滤
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/3 20:47
     */
    public static ResultScanner getScanner(String nameSpace, String tableName, FilterList filterList) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Scan scan = new Scan();
            scan.setFilter(filterList);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName
     * @param rowKey
     * @param columnFamily 列族
     * @param qualifier    限定符
     * @description: 获取指定cell数据
     * @return: java.lang.String
     * @author 熟透的蜗牛
     * @date: 2025/1/3 20:59
     */
    public static String getCell(String nameSpace, String tableName, String rowKey, String columnFamily, String qualifier) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            if (!get.isCheckExistenceOnly()) {
                get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
                Result result = table.get(get);
                byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
                return Bytes.toString(resultValue);
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @description: 删除一行数据
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 21:34
     */
    public static boolean deleteRow(String nameSpace, String tableName, String rowKey) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * @param nameSpace
     * @param tableName
     * @param rowKey
     * @param familyName
     * @param qualifier
     * @description: 删除指定列
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/3 21:34
     */
    public static boolean deleteColumn(String nameSpace, String tableName, String rowKey, String familyName,
                                       String qualifier) {
        try {
            Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
            table.delete(delete);
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /********************************************************分割线，下面为测试Filter的代码示例***************************************************************/


    /**
     * @param tableName
     * @param rowKey
     * @description: rowkey过滤器
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 15:01
     */
    public static ResultScanner getDataByRowFilter(String tableName, String rowKey) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            Filter filter = new RowFilter(CompareOperator.GREATER_OR_EQUAL,
                    new BinaryComparator(Bytes.toBytes(rowKey)));
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

    /**
     * @param tableName
     * @param qualifierName 列限定符名称
     * @description: 列限定符过滤器，理解为只查询某一列
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 15:30
     */
    public static ResultScanner getDataByQualifierFilter(String tableName, String qualifierName) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            Filter filter = new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(qualifierName)));
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

    /**
     * @param tableName     表名
     * @param familyName    列族名称
     * @param qualifierName 列族限定符
     * @param value         和那个值比较
     * @description: 依赖过滤器
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 15:39
     */
    public static ResultScanner getDataByDependentColumnFilter(String tableName, String familyName, String qualifierName, String value) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            //第一个参数是列族。第二个参数是限定符
            // 第三个字段为是否丢弃参考列所在值，为 true 时丢弃则该列为null，为 false 时会返回对应的值
            // 第四个参数为比较运算符，第五个为比较器
            Filter filter = new DependentColumnFilter(Bytes.toBytes(familyName), Bytes.toBytes(qualifierName),
                    false, CompareOperator.LESS, new BinaryComparator(Bytes.toBytes(value)));
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

    /**
     * @param tableName
     * @param value
     * @description: 值过滤器, 其中不会匹配RowKey
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 15:59
     */
    public static ResultScanner getDataByValueFilter(String tableName, String value) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            Filter filter = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL, new BinaryPrefixComparator(Bytes.toBytes(value)));
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * @param tableName
     * @param value
     * @description: 列族过滤器
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 16:24
     */
    public static ResultScanner getDataByFamilyFilter(String tableName, String value) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            Filter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(value)));
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName
     * @param prefix
     * @description: 前缀过滤器 基于 RowKey 值决定某行数据是否被过滤。
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 16:50
     */
    public static ResultScanner getDataByPrefixFilter(String tableName, String prefix) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @param tableName
     * @param prefix
     * @description: 列前缀过滤器 基于列限定符（列名）决定某行数据是否被过滤
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 16:54
     */
    public static ResultScanner getDataByColumnPrefixFilter(String tableName, String prefix) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            Filter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * @param tableName
     * @param prefix
     * @param familyName
     * @description: 多个过滤器组合使用
     * @return: org.apache.hadoop.hbase.client.ResultScanner
     * @author 熟透的蜗牛
     * @date: 2025/1/5 17:31
     */
    public static ResultScanner getDataByMultiFilter(String tableName, String prefix, String familyName) {
        try {
            Table table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            Filter prefixFilter = new PrefixFilter(Bytes.toBytes(prefix));
            RegexStringComparator regexStringComparator = new RegexStringComparator("^\\w{3}$");
            RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, regexStringComparator);
            FamilyFilter familyFilter = new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(familyName)));
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE, prefixFilter, rowFilter, familyFilter);

            scan.setFilter(filterList);
            ResultScanner scanner = table.getScanner(scan);
            return scanner;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

    public static boolean useCoprocessor(String tableName, List<String> columnFamilies) throws IOException {
        //创建表
        Admin admin = connection.getAdmin();
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        //创建表
        if (!exists) {
            //如果namespace是空值则会使用default，作为命名空间
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
            //设置协处理器，指定class
            tableDescriptorBuilder.setCoprocessor("com.xiaojie.hadoop.hbase.coprocessor.MyCoprocessor");
            columnFamilies.forEach(cf -> {
                ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
                columnFamilyDescriptorBuilder.setMaxVersions(1);
                ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
                tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
            });
            admin.createTable(tableDescriptorBuilder.build());
            return true;
        } else {
            log.info("table exists>>>>>>>>");
            return false;
        }
    }


    /**
     * @param tableName
     * @param columnFamilies
     * @param regionCount    分区个数
     * @description: 预分区创建表，创建的region总数将是分割键的数量加一，
     * 比如总共有3个，分区则需要2个分区键，则可以把分区分为三份，例如 ——|——|——
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/7 6:28
     */
    public static boolean createTableRegion(String tableName, List<String> columnFamilies, int regionCount) throws IOException {
        Admin admin = connection.getAdmin();
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        //创建表
        if (!exists) {
            //如果namespace是空值则会使用default，作为命名空间
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
            columnFamilies.forEach(cf -> {
                ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
                columnFamilyDescriptorBuilder.setMaxVersions(1);
                ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
                tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
            });
            //生成分区分割键
            byte[][] bytes = RegionUtil.genRegionKey(regionCount);
            admin.createTable(tableDescriptorBuilder.build(), bytes);
            return true;
        } else {
            log.info("table exists>>>>>>>>");
            return false;
        }
    }


    /**
     * @param tableName
     * @param rowKey
     * @param columnFamilyName
     * @param qualifier
     * @param value
     * @param regionCount
     * @description: 插入数据到指定的分区
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/7 6:34
     */
    public static boolean putRegionRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
                                       String value, int regionCount) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            //生成新的RowKey
            String row = RegionUtil.genRegionNum(rowKey, regionCount);
            Put put = new Put(Bytes.toBytes(row));
            put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
            table.put(put);
            log.info(">>>>>>>插入数据成功");
            return true;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}
